-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-54663][CORE] Computes RowBasedChecksum in ShuffleWriters #50230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
12f9115 to
64dd36b
Compare
| checksumValue = checksumValue ^ rowChecksumValue | ||
| } catch { | ||
| case NonFatal(e) => | ||
| logInfo("Checksum computation encountered error: ", e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| logInfo("Checksum computation encountered error: ", e) | |
| logError("Checksum computation encountered error: ", e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| final long checksum = RowBasedChecksum.getAggregatedChecksumValue(rowBasedChecksums); | ||
| return checksum; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| final long checksum = RowBasedChecksum.getAggregatedChecksumValue(rowBasedChecksums); | |
| return checksum; | |
| return RowBasedChecksum.getAggregatedChecksumValue(rowBasedChecksums); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| val mapSideCombine: Boolean = false, | ||
| val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor) | ||
| val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor, | ||
| val rowBasedChecksums: Array[RowBasedChecksum] = Array.empty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Array.empty is not a constant but a function. Shall we create a constant of empty Array[RowBasedChecksum] in object ShuffleDependency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| "partition produce different output data or not (same set of keyValue pairs). In case " + | ||
| "the output data has changed across retries, Spark will need to retry all tasks of the " + | ||
| "consumer stages to avoid correctness issues.") | ||
| .version("4.1.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| .version("4.1.0") | |
| .version("4.0.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change proposed is not specific to SQL - make it a spark config instead ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan, it is too late for 4.0 - let us move it to 4.1
| final private class MyByteArrayOutputStream(size: Int) | ||
| extends ByteArrayOutputStream(size) { | ||
| def getBuf: Array[Byte] = buf | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have other case of MyByteArrayOutputStream for this purpose.
Refactor to reuse it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
core/src/main/java/org/apache/spark/shuffle/checksum/RowBasedChecksum.scala
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/spark/shuffle/checksum/RowBasedChecksum.scala
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/spark/shuffle/checksum/RowBasedChecksum.scala
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
Show resolved
Hide resolved
| "partition produce different output data or not (same set of keyValue pairs). In case " + | ||
| "the output data has changed across retries, Spark will need to retry all tasks of the " + | ||
| "consumer stages to avoid correctness issues.") | ||
| .version("4.1.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change proposed is not specific to SQL - make it a spark config instead ?
| "partition produce different output data or not (same set of keyValue pairs). In case " + | ||
| "the output data has changed across retries, Spark will need to retry all tasks of the " + | ||
| "consumer stages to avoid correctness issues.") | ||
| .version("4.1.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan, it is too late for 4.0 - let us move it to 4.1
| if (mapStatuses(mapIndex) != null) mapStatuses(mapIndex) else mapStatusesDeleted(mapIndex) | ||
| if (preStatus != null && preStatus.checksumValue != status.checksumValue) { | ||
| checksumMismatchIndices.add(mapIndex) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are three main cases here:
- task reattempt due to stage reattempt after downstream stages have consumed output.
- task reattempt due to stage reattempt before downstream stages have consumed output (missing partitions detected during stage attempt completion).
- speculative tasks.
For the latter two, we dont need to track it in checksumMismatchIndices
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, for case 1, we need to track the mismatches. The usage of checksumMismatchIndices is that (in the next PR) we will rollback the downstream stages, if we detect checksum mismatches for its upstream stages.
For case 2, if downstream stages have not consumed output, which means they have not started. In this case, the rollback is a no-op, and it doesn't hurt to record the mismatches here.
For case 3, I think we need to record the mismatches. Assuming a situation where all partitions of a stage have finished, while some speculative tasks are still running. As all outputs have been produced, the downstream stage can start and read from the data. Later, some speculative tasks finish, and new mapStatus will override the old mapStatus with new data location. For the downstream stage, the not yet started tasks or retried tasks would read from the new data, while the finished and running tasks would read from the old data, resulting in inconsistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For case 2, if downstream stages have not consumed output, which means they have not started. In this case, the rollback is a no-op, and it doesn't hurt to record the mismatches here.
It is unclear how checksumMismatchIndices will be used - as perhaps it might be fine to record it: but my query would be why record it at all ?
Is it due to complexity of detecting case (2) ?
For case 3, I think we need to record the mismatches. Assuming a situation where all partitions of a stage have finished, while some speculative tasks are still running. As all outputs have been produced, the downstream stage can start and read from the data. Later, some speculative tasks finish, and new mapStatus will override the old mapStatus with new data location. For the downstream stage, the not yet started tasks or retried tasks would read from the new data, while the finished and running tasks would read from the old data, resulting in inconsistency.
That is fair, this is indeed possible.
| if (!hasError) { | ||
| try { | ||
| val rowChecksumValue = calculateRowChecksum(key, value) | ||
| checksumValue = checksumValue ^ rowChecksumValue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
XOR has problems when the same (key, value) pair is used multiple times. Should we track the number of pairs as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a good point but hard to compute as this will be a bit more stateful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about in addition to the bitwise XOR (currently checksumValue ) calculating a SUM as well and when the getValue is called combine those two into one number with an extra XOR (or just add together multiplying one with prime number)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's what I was referring to. But combining the number of pairs (count, not the sum) into the final checksum should be fine.
Update: No, combining just the count of pairs into the final checksum still has problems with duplicates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps something like this might work ? https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function
It will be more expensive than xor, but should handle order and duplication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mridulm I think what we need is order insensitivity (within one partition the order of rows should not matter), fnv as I see is sensitive for the order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, order sensitivity matter at beginning of task, not at end !
Sum + xor or sum + xor + multiplication with some xor folding to generate final hash might be cheap.
Can't think of other alternatives which might work well and yet is reasonably robust to duplication
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi folks, I am working with Jiexing to follow up on the PR.
Do you think something like below combining both the sum and xor of the hashcode would be helpful to address the concerns? cc @peter-toth @attilapiros @mridulm @cloud-fan
private var checksumValue: Long = 0
private var sum: Long = 0
def rotateLeft(value: Long, k: Int): Long = {
((value << k) & 0xffffffffffffffff) | (x >>> (64 - k))
}
def getValue: Long = {
if (!hasError) {
checksumValue ^ rotateLeft(sum, 27)
} else {
0
}
}
def update(key: Any, value: Any): Unit = {
...
val rowChecksumValue = calculateRowChecksum(key, value)
checksumValue = checksumValue ^ rowChecksumValue
sum += rowChecksumValue
...
}
| * output data or not. | ||
| */ | ||
| private final RowBasedChecksum[] rowBasedChecksums; | ||
| private final SparkConf conf; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This conf is not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
| this.shuffleExecutorComponents = shuffleExecutorComponents; | ||
| this.partitionChecksums = createPartitionChecksums(numPartitions, conf); | ||
| this.rowBasedChecksums = dep.rowBasedChecksums(); | ||
| this.conf = conf; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here: this conf is not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
| return partitionLengths; | ||
| } | ||
|
|
||
| public RowBasedChecksum[] getRowBasedChecksums() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: a comment to say it is for testing only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| } | ||
|
|
||
| object RowBasedChecksum { | ||
| def createPartitionRowBasedChecksums( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: a comment to say it is for testing only or better would be move to a helper class used in the tests only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe into the ShuffleChecksumTestHelper? But its name suggest it is only for shuffle checksum. So what about an extra rename to ChecksumTestHelper?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh I see the comment above:
Note that this checksum computation is very expensive, and it is used only in tests
in the core component. A much cheaper implementation of RowBasedChecksum is in
UnsafeRowChecksum.
And I can see your comment:
I can't use UnsafeRowChecksum.scala in the test because the test is in core, while the usaferow is in sql. So I added OutputStreamRowBasedChecksum for the tests in core.
But you can move this class and object to the test code of the core module, is not it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see you cannot move the whole object so let's just move the method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved the method to ShuffleChecksumTestHelper, didn't rename the class as currently all the new added classes/components were in shuffle package.
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Outdated
Show resolved
Hide resolved
|
|
||
| def createPartitionRowBasedChecksums( | ||
| numPartitions: Int, | ||
| checksumAlgorithm: String): Array[RowBasedChecksum] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given it's test only, do we really need to respect the checksum algorithm config? Can we just pick a simplest algorithm?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. As we don't compare and verify different checksumAlgorithms here, set as ADLER32 which is the simplest in the valid set.
core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala
Outdated
Show resolved
Hide resolved
| val checksumAlgorithm = conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM) | ||
| val rowBasedChecksums: Array[RowBasedChecksum] = | ||
| createPartitionRowBasedChecksums(checksumSize, checksumAlgorithm) | ||
| when(dependency.rowBasedChecksums).thenReturn(rowBasedChecksums) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just mocking a testing ShuffleDependency, we can always return the row based checksum.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this test is not about rowBasedChecksum, set it as empty instead.
Co-authored-by: Wenchen Fan <[email protected]>
…te.scala Co-authored-by: Wenchen Fan <[email protected]>
…riterSuite.scala Co-authored-by: Wenchen Fan <[email protected]>
…ortShuffleWriterSuite.scala Co-authored-by: Wenchen Fan <[email protected]>
…WriterSuite.java Co-authored-by: Wenchen Fan <[email protected]>
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to understand better why this is only applicable to sql applications, more here.
| private var partitionLengths: Array[Long] = _ | ||
|
|
||
| def getRowBasedChecksums: Array[RowBasedChecksum] = { | ||
| if (sorter != null) sorter.getRowBasedChecksums else new Array[RowBasedChecksum](0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We can use ShuffleDependency.EMPTY_ROW_BASED_CHECKSUMS here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| } | ||
|
|
||
| object ShuffleDependency { | ||
| private val EMPTY_ROW_BASED_CHECKSUMS: Array[RowBasedChecksum] = Array.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We can make this private[spark] and use it in other places within this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| val partitionId = actualPartitioner.getPartition(kv._1) | ||
| map.changeValue((partitionId, kv._1), update) | ||
| maybeSpillCollection(usingMap = true) | ||
| if (!rowBasedChecksums.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit:
(here and other places)
| if (!rowBasedChecksums.isEmpty) { | |
| if (rowBasedChecksums.nonEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, updated.
| override protected def calculateRowChecksum(key: Any, value: Any): Long = { | ||
| assert( | ||
| value.isInstanceOf[UnsafeRow], | ||
| "Expecting UnsafeRow but got " + value.getClass.getName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Do we need this assert check ?
I saw a few cases where it exists, but appears to be fairly rare in sql code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's just for better error message, as the code below does value.asInstanceOf[UnsafeRow].
|
Hi @mridulm , please take another look when you get a chance. Let me know if I missed anything. Thanks. |
mridulm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
Merged to master. |
### What changes were proposed in this pull request? This PR computes RowBasedChecksum for ShuffleWriters, which is controlled under spark.shuffle.rowbased.checksum.enabled. If enabled, Spark will calculate the RowBasedChecksum values for each partition and each map output and returns the values from executors to the driver. Different from the previous shuffle Checksum, RowBasedChecksum is independent of the input row order, which is used to detect whether different task attempts of the same partition produce different output data or not (key or value). In case the output data has changed across retries, Spark will need to retry all tasks of the consumer stage to avoid correctness issues. This PR contains only the RowBasedChecksum computation. In next PR, I plan to trigger the full stage retry when we detect checksum mismatches. ### Why are the changes needed? Problem: Spark's resilience features can cause an RDD to be partially recomputed, e.g. when an executor is lost due to downscaling, or due to a spot instance kill. When the output of a nondeterministic task is recomputed, Spark does not always recompute everything that depends on this task's output. In some cases, some subsequent computations are based on the output of one "attempt" of the task, while other subsequent computations are based on another "attempt". This could be problematic when the producer stage is non-deterministic. In which case, the second attempt of the same task can produce output that is very different from the first one. For example, if the stage uses a round-robin partitioning, some of the output data could be placed in different partitions in different task attempts. This could lead to incorrect results unless we retry the whole consumer stage that depends on retried non-deterministic stage. Below is an example of this. Example: Let’s say we have Stage 1 and Stage 2, where Stage 1 is the producer and Stage 2 is the consumer. Assume that the data produced by Task 2 were lost due to some reason while Stage 2 is executing. Further assume that at this point, Task 1 of Stage 2 has already gotten all its inputs and finishes, while Task 2 of Stage 2 fails with data fetch failures. <img width="600" alt="example 1" src="https://github.com/user-attachments/assets/549d1d90-3a8c-43e3-a891-1a6c614e9f24" /> Task 2 of Stage 1 will be retried to reproduce the data, and after which Task 2 of Stage 2 is retried. Eventually, Task 1 and Task 2 of Stage 2 produces the result which contains all 4 tuples {t1, t2, t3, t4} as shown in the example graph. <img width="720" alt="example 2" src="https://github.com/user-attachments/assets/bebf03d5-f05e-46b6-8f78-bfad08999867" /> Now, let’s assume that Stage 1 is non-deterministic (e.g., when using round-robin partitioning and the input data is not ordering), and Task 2 places tuple t3 for Partition 1 and tuple t4 for Partition 2 in its first attempt. It places tuple t4 for Partition 1 and tuple t3 for Partition 2 in its second attempt. When Task 2 of Stage 2 is retried, instead of reading {t2, t4} as it should, it reads {t2, t3} as its input. The result generated by Stage 2 is {t1, t2, t3, t3}, which is inaccurate. <img width="720" alt="example 3" src="https://github.com/user-attachments/assets/730fac0f-dfc3-4392-a74f-ed3e0d11e665" /> The problem can be avoided if we retry all tasks of Stage 2. As all tasks read consistent data, we can produce result correctly, regardless of how the retried of Stage 1 Task 2 would partition the data. <img width="720" alt="example 4" src="https://github.com/user-attachments/assets/a501a33e-97bb-4a01-954f-bc7d0f01f3e6" /> Proposal: To avoid correctness issues produce by non-deterministic stage with partial retry, we propose an approach which first try to detect inconsistent data that might be generated by different task attempts of a non-deterministic stage. For example, whether all the data partitions generated by Task 2 in the first attempt are the same as the all the data partitions generated by the second attempt. We retry the entire consumer stages if inconsistent data is detected. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tested Benchmark test: tpcds (10gb): the overhead of checksum computation with UnsafeRowChecksum is 0.4%. tpcds (3tb): the overhead of checksum computation with UnsafeRowChecksum is 0.72%. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#50230 from JiexingLi/shuffle-checksum. Lead-authored-by: Tengfei Huang <[email protected]> Co-authored-by: Jiexing Li <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
What changes were proposed in this pull request?
This PR computes RowBasedChecksum for ShuffleWriters, which is controlled under spark.shuffle.rowbased.checksum.enabled.
If enabled, Spark will calculate the RowBasedChecksum values for each partition and each map output and returns the values from executors to the driver. Different from the previous shuffle Checksum, RowBasedChecksum is independent of the input row order, which is used to detect whether different task attempts of the same partition produce different output data or not (key or value). In case the output data has changed across retries, Spark will need to retry all tasks of the consumer stage to avoid correctness issues.
This PR contains only the RowBasedChecksum computation. In next PR, I plan to trigger the full stage retry when we detect checksum mismatches.
Why are the changes needed?
Problem:
Spark's resilience features can cause an RDD to be partially recomputed, e.g. when an executor is lost due to downscaling, or due to a spot instance kill. When the output of a nondeterministic task is recomputed, Spark does not always recompute everything that depends on this task's output. In some cases, some subsequent computations are based on the output of one "attempt" of the task, while other subsequent computations are based on another "attempt".
This could be problematic when the producer stage is non-deterministic. In which case, the second attempt of the same task can produce output that is very different from the first one. For example, if the stage uses a round-robin partitioning, some of the output data could be placed in different partitions in different task attempts. This could lead to incorrect results unless we retry the whole consumer stage that depends on retried non-deterministic stage. Below is an example of this.
Example:
Let’s say we have Stage 1 and Stage 2, where Stage 1 is the producer and Stage 2 is the consumer. Assume that the data produced by Task 2 were lost due to some reason while Stage 2 is executing. Further assume that at this point, Task 1 of Stage 2 has already gotten all its inputs and finishes, while Task 2 of Stage 2 fails with data fetch failures.
Task 2 of Stage 1 will be retried to reproduce the data, and after which Task 2 of Stage 2 is retried. Eventually, Task 1 and Task 2 of Stage 2 produces the result which contains all 4 tuples {t1, t2, t3, t4} as shown in the example graph.
Now, let’s assume that Stage 1 is non-deterministic (e.g., when using round-robin partitioning and the input data is not ordering), and Task 2 places tuple t3 for Partition 1 and tuple t4 for Partition 2 in its first attempt. It places tuple t4 for Partition 1 and tuple t3 for Partition 2 in its second attempt. When Task 2 of Stage 2 is retried, instead of reading {t2, t4} as it should, it reads {t2, t3} as its input. The result generated by Stage 2 is {t1, t2, t3, t3}, which is inaccurate.
The problem can be avoided if we retry all tasks of Stage 2. As all tasks read consistent data, we can produce result correctly, regardless of how the retried of Stage 1 Task 2 would partition the data.
Proposal:
To avoid correctness issues produce by non-deterministic stage with partial retry, we propose an approach which first try to detect inconsistent data that might be generated by different task attempts of a non-deterministic stage. For example, whether all the data partitions generated by Task 2 in the first attempt are the same as the all the data partitions generated by the second attempt. We retry the entire consumer stages if inconsistent data is detected.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit tested
Benchmark test:
tpcds (10gb): the overhead of checksum computation with UnsafeRowChecksum is 0.4%.
tpcds (3tb): the overhead of checksum computation with UnsafeRowChecksum is 0.72%.
Was this patch authored or co-authored using generative AI tooling?
No